package com.bizmda.bizsip.app.service;

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.bizmda.bizsip.common.*;
import com.bizmda.bizsip.config.AbstractSinkConfig;
import com.bizmda.bizsip.config.RabbitmqSinkConfig;
import com.bizmda.bizsip.config.RestSinkConfig;
import com.bizmda.bizsip.config.SinkConfigMapping;
import com.bizmda.bizsip.app.config.RabbitmqConfig;
import com.bizmda.bizsip.service.AppLogService;
import com.open.capacity.redis.util.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.HashMap;
import java.util.Map;

/**
 * SIP聚合服务API调用用接口
 */
@Slf4j
@Service
public class AppClientService {
    public static final String PREFIX_SIP_ASYNCLOG = "sip:asynclog:";
    @Autowired
    private RestTemplate restTemplate;
    @Autowired
    private SinkConfigMapping sinkConfigMapping;
    @Autowired
    private RedisUtil redisUtil ;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private AppLogService appLogService;
    /**
     * 执行服务适配器服务的调用
     * @param sinkId 服务适配器ID
     * @param inData 传入数据
     * @return 返回数据，为BizMessage格式
     */
    public BizMessage<JSONObject> callSink(String sinkId, Object inData) {
        log.debug("入参:{},\n{}",sinkId,BizUtils.buildJsonLog(inData));
        JSONObject jsonObject = JSONUtil.parseObj(inData);

        BizMessage<JSONObject> inMessage = BizTools.bizMessageThreadLocal.get();
        inMessage.setData(jsonObject);

        AbstractSinkConfig sinkConfig = (AbstractSinkConfig) sinkConfigMapping.getSinkConfig(sinkId);
        BizMessage<JSONObject> outMessage = null;
        if (sinkConfig.getType() == AbstractSinkConfig.TYPE_REST) {
            RestSinkConfig restServerAdaptorConfig = (RestSinkConfig)sinkConfig;
            log.debug("调用Restful服务:{}",restServerAdaptorConfig.getUrl());
            outMessage = restTemplate.postForObject(restServerAdaptorConfig.getUrl(), inMessage, BizMessage.class);
        }
     else if (sinkConfig.getType() == AbstractSinkConfig.TYPE_RABBITMQ) {
            RabbitmqSinkConfig rabbitmqSinkConfig = (RabbitmqSinkConfig) sinkConfig;
            log.debug("调用RabbitMQ服务:{}", rabbitmqSinkConfig.getRoutingKey());
            CorrelationData correlationData = new CorrelationData(inMessage.getTraceId());
            outMessage = (BizMessage<JSONObject>) this.rabbitTemplate.convertSendAndReceive(rabbitmqSinkConfig.getExchange(), rabbitmqSinkConfig.getRoutingKey(), inMessage, correlationData);
        }
        log.debug("返回:\n{}",BizUtils.buildBizMessageLog(outMessage));
        if (!(outMessage.getData() instanceof JSONObject)) {
            outMessage.setData(JSONUtil.parseObj(outMessage.getData()));
        }
        return outMessage;
    }

    /**
     * 调用存储转发（SAF）服务
     * @param serviceId 调用的服务ID
     * @param inData 传入数据
     * @param delayMilliseconds 延迟时间列表多参（ms）
     * @return 返回数据，为BizMessage格式
     */
    public BizMessage<JSONObject> callDelayAppService(String serviceId, Object inData, int... delayMilliseconds) {
        log.debug("入参:{},{},{}",serviceId,inData,delayMilliseconds);
        JSONObject jsonObject = JSONUtil.parseObj(inData);

        BizMessage<JSONObject> inMessage = BizTools.bizMessageThreadLocal.get();
        inMessage.setData(jsonObject);

        BizMessage<JSONObject> childBizMessage = BizMessage.createChildTransaction(serviceId,inMessage);

        Map<String,Object> map = new HashMap<>(16);
        map.put("serviceId",serviceId);
        map.put("bizmessage",inMessage);

        map.put("retryCount",0);
        map.put("delayMilliseconds",delayMilliseconds);
        rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_SERVICE_EXCHANGE, RabbitmqConfig.DELAY_SERVICE_ROUTING_KEY, map,
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) {
                        //设置消息持久化
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        message.getMessageProperties().setDelay(delayMilliseconds.length>0 ? delayMilliseconds[0]:0);
                        return message;
                    }
                });
        this.appLogService.sendAppSuspendLog(inMessage,childBizMessage);
        log.debug("发送交易挂起日志");
        return childBizMessage;
    }

//    /**
//     * 调用RabbitMQ RPC服务
//     * @param queueName RPC队列名
//     * @param inData 传入数据
//     * @return 返回数据
//     */
//    public BizMessage<JSONObject> callRabbitMQ(String queueName,Object inData) {
//        log.debug("sipService.callRabbitMQ({})\n{}",queueName,BizUtils.buildJsonLog(inData));
//        JSONObject jsonObject = JSONUtil.parseObj(inData);
//
//        BizMessage<JSONObject> inMessage = BizUtils.bizMessageThreadLocal.get();
//        inMessage.setData(jsonObject);
//        CorrelationData correlationData = new CorrelationData(inMessage.getTraceId());
//
//        Object response = rabbitTemplate.convertSendAndReceive("directExchange", queueName, inMessage, correlationData);
//
//        log.debug("sipService.callRabbitMQ()返回:\n{}",BizUtils.buildBizMessageLog((BizMessage<JSONObject>)response));
//
//        return (BizMessage<JSONObject>)response;
//    }

//    /**
//     * 设置SAF服务的延迟执行时间
//     * @param delayTime 延迟执行时间，单位为ms
//     */
//    public void setTmDelayTime(int delayTime) {
//        TmContext tmContext = BizUtils.tmContextThreadLocal.get();
//        tmContext.setDelayTime(delayTime);
//        BizUtils.tmContextThreadLocal.set(tmContext);
//    }
//
//    /**
//     * 获取设置的SAF服务延迟执行时间，单位为ms
//     * @return
//     */
//    public int getTmDelayTime() {
//        TmContext tmContext = BizUtils.tmContextThreadLocal.get();
//        return tmContext.getDelayTime();
//    }

//    /**
//     * 获取SAF服务的当前重试次数
//     * @return 重试次数
//     */
//    public int getServiceRetryCount() {
//        TmContext tmContext = BizUtils.tmContextThreadLocal.get();
//        BizUtils.debug("返回",tmContext.getRetryCount());
//        return tmContext.getRetryCount();
//    }

//    /**
//     * 设置当前SAF服务的运行状态
//     * @param status 设置的SAF服务状态，有"success","error"，"retry"三种
//     */
//    public void setServiceStatus(String status) {
//        TmContext tmContext = BizUtils.tmContextThreadLocal.get();
//        if (tmContext == null) {
//            return;
//        }
//        if ("success".equalsIgnoreCase(status)) {
//            tmContext.setServiceStatus(TmContext.SERVICE_STATUS_SUCCESS);
//        }
//        else if("error".equalsIgnoreCase(status)) {
//            tmContext.setServiceStatus(TmContext.SERVICE_STATUS_ERROR);
//        }
//        else if("retry".equalsIgnoreCase(status)) {
//            tmContext.setServiceStatus(TmContext.SERVICE_STATUS_RETRY);
//        }
//        BizUtils.tmContextThreadLocal.set(tmContext);
//    }

//    /**
//     * 保存异步服务上下文
//     * @param transactionKey 异步回调的全局唯一交易索引键
//     * @param context 注入回调聚合服务的上下文变量
//     * @param timeout 异步服务超时时间，单位（秒）
//     */
//    public void saveAsyncContext(String transactionKey,Object context,long timeout) {
//        BizUtils.debug("入参",transactionKey,timeout,context);
//        BizMessage bizMessage = BizUtils.bizMessageThreadLocal.get();
//        Map<String,Object> map = new HashMap<>();
//        map.put("traceId",bizMessage.getTraceId());
//        map.put("context",context);
//        this.redisUtil.set(PREFIX_SIP_ASYNCLOG +transactionKey, context, timeout);
//    }

    /**
     * 恢复异步服务上下文
     * @param transactionKey 异步回调的全局唯一交易索引键
     * @return 异步服务上下文
     */
    public Object loadAsyncContext(String transactionKey) throws BizException {
        log.debug("入参:{}",transactionKey);
        Map<String, Object> map = (Map<String, Object>) this.redisUtil.get(PREFIX_SIP_ASYNCLOG + transactionKey);
        if (map == null) {
            throw new BizException(BizResultEnum.ASYNC_APP_SERVICE_CONTEXT_NOT_FOUND);
        }
        String traceId = (String) map.get("traceId");
        Object context = map.get("context");
        BizMessage bizMessage = BizTools.bizMessageThreadLocal.get();
        if (bizMessage.getParentTraceId() == null) {
            log.debug("重置parentTraceId:{}",traceId);
            bizMessage.setParentTraceId(traceId);
            BizTools.bizMessageThreadLocal.set(bizMessage);
            log.debug("返回:{}",context);
            return context;
        } else if (bizMessage.getParentTraceId() == traceId) {
            log.debug("返回:{}",context);
            return context;
        } else {
            throw new BizException(BizResultEnum.ASYNC_APP_SERVICE_PARENT_TRANCTION_BINDDING_ERROR);
        }
    }
}
